package com.atguigu.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.common.Constant;
import com.atguigu.utils.DateFormatUtil;
import com.atguigu.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;

//数据流:web/app -> Nginx -> 日志服务器(file) -> Flume -> Kafka(ODS) -> FlinkApp -> Kafka(DWD)
//程  序:Mock -> Nginx(日志服务器、文件) -> f1.sh -> Kafka(ZK) -> DwdTrafficBaseLogSplit -> Kafka(ZK)
public class Dwd01_TrafficBaseLogSplit {

    public static void main(String[] args) throws Exception {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.1 开启CK
//        env.enableCheckpointing(10000L);
//        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//        checkpointConfig.setCheckpointTimeout(20000L);
//        checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/flink-ck");
//        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//        //checkpointConfig.setCheckpointInterval(10000L);
//        checkpointConfig.setMinPauseBetweenCheckpoints(5000L);
//        checkpointConfig.setMaxConcurrentCheckpoints(2);
//        //默认是int类型的最大值
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
//        env.setStateBackend(new HashMapStateBackend());
//
//        System.setProperty("HADOOP_USER_NAME", "atguigu");

        //2.从Kafka topic_log 主题读取数据
        DataStreamSource<String> kafkaDS = env.fromSource(KafkaUtil.getKafkaSource(Constant.TOPIC_ODS_LOG, "traffic_base_log_230315"),
                WatermarkStrategy.noWatermarks(),
                "kafka-source");

        //3.过滤掉脏数据并转换为JSON对象
        OutputTag<String> dirtyTag = new OutputTag<String>("dirty") {
        };
        SingleOutputStreamOperator<JSONObject> jsonObj = kafkaDS.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, ProcessFunction<String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
                if (!"".equals(value)) {
                    try {
                        JSONObject jsonObject = JSON.parseObject(value);
                        out.collect(jsonObject);
                    } catch (JSONException e) {
                        ctx.output(dirtyTag, value);
                    }
                }
            }
        });
        jsonObj.getSideOutput(dirtyTag).print("dirtyTag>>>>>");

        //4.按照Mid进行KeyBy,使用状态编程进行新老用户校验
        KeyedStream<JSONObject, String> keyedStream = jsonObj.keyBy(json -> json.getJSONObject("common").getString("mid"));
        SingleOutputStreamOperator<JSONObject> jsonObjWithNewFlagDS = keyedStream.map(new RichMapFunction<JSONObject, JSONObject>() {

            private ValueState<String> lastVisitState;

            @Override
            public void open(Configuration parameters) throws Exception {
                lastVisitState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-visit-dt", String.class));
            }

            @Override
            public JSONObject map(JSONObject value) throws Exception {

                //获取相关数据
                String isNew = value.getJSONObject("common").getString("is_new");
                String lastDt = lastVisitState.value();
                String curDt = DateFormatUtil.toDate(value.getLong("ts"));

                if ("1".equals(isNew)) {
                    if (lastDt == null || lastDt.equals(curDt)) {
                        lastVisitState.update(curDt);
                    } else {
                        value.getJSONObject("common").put("is_new", 0);
                    }
                } else { // 0
                    if (lastDt == null) {
                        lastVisitState.update("1970-01-01");
                    }
                }

                return value;
            }
        });

        //5.分流,一个主流，4个侧流    页面数据:主流  启动、曝光、动作、错误:侧流
        OutputTag<String> startTag = new OutputTag<String>("start") {
        };
        OutputTag<String> displayTag = new OutputTag<String>("display") {
        };
        OutputTag<String> actionTag = new OutputTag<String>("action") {
        };
        OutputTag<String> errorTag = new OutputTag<String>("error") {
        };
        SingleOutputStreamOperator<String> pageDS = jsonObjWithNewFlagDS.process(new ProcessFunction<JSONObject, String>() {
            @Override
            public void processElement(JSONObject value, ProcessFunction<JSONObject, String>.Context ctx, Collector<String> out) throws Exception {

                //尝试获取错误信息
                String err = value.getString("err");
                if (err != null) {
                    //将数据输出到错误侧流
                    ctx.output(errorTag, value.toJSONString());
                    value.remove("err");
                }

                //尝试获取启动信息
                String start = value.getString("start");
                if (start != null) {
                    //将数据输出到启动侧流
                    ctx.output(startTag, value.toJSONString());
                } else {//页面日志

                    JSONObject page = value.getJSONObject("page");
                    JSONObject common = value.getJSONObject("common");
                    Long ts = value.getLong("ts");

                    //尝试获取曝光数据
                    JSONArray displays = value.getJSONArray("displays");
                    if (displays != null) {
                        //遍历输出
                        for (int i = 0; i < displays.size(); i++) {
                            JSONObject display = displays.getJSONObject(i);
                            display.put("common", common);
                            display.put("page", page);
                            display.put("ts", ts);
                            ctx.output(displayTag, display.toJSONString());
                        }
                        value.remove("displays");
                    }

                    //尝试获取动作数据
                    JSONArray actions = value.getJSONArray("actions");
                    if (actions != null) {
                        //遍历输出
                        for (int i = 0; i < actions.size(); i++) {
                            JSONObject action = actions.getJSONObject(i);
                            action.put("common", common);
                            action.put("page", page);
                            ctx.output(actionTag, action.toJSONString());
                        }
                        value.remove("actions");
                    }

                    //将页面日志输出到主流
                    out.collect(value.toJSONString());
                }
            }
        });

        //6.将所有的流写出到Kafka
        SideOutputDataStream<String> startDS = pageDS.getSideOutput(startTag);
        SideOutputDataStream<String> displayDS = pageDS.getSideOutput(displayTag);
        SideOutputDataStream<String> actionDS = pageDS.getSideOutput(actionTag);
        SideOutputDataStream<String> errorDS = pageDS.getSideOutput(errorTag);

        pageDS.print("pageDS>>>>>");
        startDS.print("startDS>>>>>>");
        displayDS.print("displayDS>>>>");
        actionDS.print("actionDS>>>>>");
        errorDS.print("errorDS>>>>>");

        pageDS.sinkTo(KafkaUtil.getKafkaSink(Constant.TOPIC_DWD_TRAFFIC_PAGE));
        startDS.sinkTo(KafkaUtil.getKafkaSink(Constant.TOPIC_DWD_TRAFFIC_START));
        displayDS.sinkTo(KafkaUtil.getKafkaSink(Constant.TOPIC_DWD_TRAFFIC_DISPLAY));
        actionDS.sinkTo(KafkaUtil.getKafkaSink(Constant.TOPIC_DWD_TRAFFIC_ACTION));
        //errorDS.sinkTo(KafkaUtil.getKafkaSink(Constant.TOPIC_DWD_TRAFFIC_ERR));
        errorDS.sinkTo(KafkaUtil.getKafkaSink(new KafkaRecordSerializationSchema<String>() {
            @Nullable
            @Override
            public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                return new ProducerRecord<>(Constant.TOPIC_DWD_TRAFFIC_ERR, element.getBytes());
            }
        }));

        //7.启动
        env.execute("DwdTrafficBaseLogSplit");

    }
}
